package com.oracle.tag

import com.dmp.config.ConfigHandler
import com.dmp.util.{JPool, TagUtil}
import org.apache.commons.lang.StringUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

import scala.collection.mutable

object DataLabel{
  def main(args: Array[String])= {
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf: SparkConf = new SparkConf()
      .setAppName("数据标签化")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //读取parquet文件
    val parquetData: DataFrame = sqlContext.read.parquet(ConfigHandler.parquetFilePath)
        .filter(TagUtil.userIfFilterCondition)
    parquetData.mapPartitions(iter=>{
      val tagMap = new mutable.HashMap[String,Map[String,Int]]()
      val jedis: Jedis = JPool.getJedis
      iter.foreach(row=>{
        var currentLineMap=Map[String,Int]()
        //获取用户Id
        val userId=TagUtil.batainUserOneId(row)
        //提取关键字段
        val adSpaceType = row.getAs[Int]("adspacetype")//广告类型
        val adSpaceTypeName = row.getAs[String]("adspacetypename")
        //提取app字段
        val appId: String = row.getAs[String]("appid")//App名称，稍后进行名称转换
        val appName: String = row.getAs[String]("appname")//App名称，稍后进行名称转换
        //渠道
        val channelId = row.getAs[Int]("adplatformproviderid")
        val client: Int = row.getAs[Int]("client") //操作系统
        val network = row.getAs[String]("networkmannername")//联网方式
        val ispName = row.getAs[String]("ispname")//运营商
        val keyWords: String = row.getAs[String]("keywords")//关键字


        val province: String = row.getAs[String]("provincename")//地域标签--省级
        val city: String = row.getAs[String]("cityname")//地域标签--市级*/
          //广告位类型
      if(adSpaceType>9) currentLineMap+=("LC"+adSpaceType->1) else if(adSpaceType>0) currentLineMap+=(("LC"+adSpaceType,1))
      if(StringUtils.isNotEmpty(adSpaceTypeName)) currentLineMap+=("LN"+adSpaceTypeName->1)

         //app名称判断
        if(StringUtils.isNotEmpty(appName)) {
          if (StringUtils.isNotEmpty(appId)) {
            val appId2Name: String = jedis.hget("appdict", appId)
            if (StringUtils.isNotEmpty(appId2Name)) currentLineMap += ("App" + appId2Name -> 1)
          } else currentLineMap += ("App" + appName -> 1)
        }
          if(channelId > 0) currentLineMap+=("CN"+channelId->1)
          client match {
            case 1 => currentLineMap += ("D00010001" -> 1)
            case 2 => currentLineMap += ("D00010002" -> 1)
            case 3 => currentLineMap += ("D00010003" -> 1)
            case _ =>
          }
              network.toUpperCase match {
                case "WIFI" => currentLineMap += (("D00020001", 1))
                case "4G" => currentLineMap += (("D00020002", 1))
                case "3G" => currentLineMap += (("D00020003", 1))
                case "2G" => currentLineMap += (("D00020004", 1))
                case _ =>
              }

              ispName.toUpperCase match  {
                case "移动" => currentLineMap += (("D00030001", 1))
                case "联通" => currentLineMap += (("D00030002", 1))
                case "电信" => currentLineMap += (("D00030003", 1))
                case _ =>
              }

        //关键字的处理
          keyWords.split("\\|")
              .filter(kw=>kw.size>=3 && kw.size<=8)
              .foreach(word=>currentLineMap+=("K"+word->1))


          if(StringUtils.isNotEmpty(province)) currentLineMap+=("ZP"+province->1)
          if(StringUtils.isNotEmpty(city)) currentLineMap+=("ZC"+city->1)

          tagMap.put(userId,currentLineMap)

      })
      jedis.close()
      tagMap.toIterator
    }).reduceByKey{
      (map1,map2)=>(map1.toList++map2.toList).groupBy(_._1).mapValues(list=>list.map(_._2).sum)
    }.foreach(println)

    sc.stop()
  }
}
